KFP sdk v2 is the current focus of the Kubeflow community, which means it will receive ongoing updates, improvements, and support. Kfp v1 is legacy and there will be no further updates or support.
In this short blog post I will present some of the changes:
Environment variables
Instead:
ENV_VARIABLE = V1EnvVar(name="ENV_VARIABLE", value="test_value")
op = print_env_variable().add_env_variable(ENV_VARIABLE)
Do:
op = print_env_variable().set_env_variable(name="ENV_VARIABLE", value="test_value")
Pod labels
Instead:
op = print_op().add_pod_label("label", "value")
Do:
from kfp import kubernetes
op = print_op()
kubernetes.add_pod_label(op, "some key", "some value")
Pod annotations
Instead:
op = print_op().set_pod_annotation("some key","some value")
Do:
from kfp import kubernetes
op = print_op()
kubernetes.add_pod_annotation(op,"some key","some value")
ParallelFor
Instead:
with dsl.ParallelFor(loop_args=ids, parallelism=0) as id:
print_op(text=id)
Do:
with dsl.ParallelFor(items=ids, parallelism=0) as id:
print_op(text=str(id))
In case of parallelFor inside parallelFor, the old way was:
with dsl.ParallelFor(loop_args=asins, parallelism=0) as asin:
with dsl.ParallelFor(loop_args=trials, parallelism=0) as trial:
print_op(asin=asin, trial=trial)
Now you can use:
with dsl.ParallelFor(items=asins, parallelism=0) as asin:
for trial in trials:
print_op(asin=str(asin), trial=str(trial))
dsl.condition is now dsl.If
Example:
from kfp import dsl
with dsl.If(condition=evaluate_task.output > 0.90, name="accuracy > 0.90"):
Make sure that all component calls have named parameters
Before:
op = print_op("some text")
Now:
op = print_op(text="some text")
Output of a component with return
If your component return value as python method:
@component()
def my_component() -> int:
return 5
Then you use that output in another component:
my_component.output
Before you were able to write (now it will fail)
my_component.outputs["output"]
Attach secrets as environment variables
Before:
from kfp.onprem import use_k8s_secret
minio_comp().apply(
use_k8s_secret(
secret_name="mlpipeline-minio-artifact",
k8s_secret_key_to_env={
"accesskey": "ACCESS_KEY",
"secretkey": "SECRET_KEY",
},
)
)
Now:
from kfp import kubernetes
list_op = minio_comp()
kubernetes.use_secret_as_env(
list_op,
secret_name="mlpipeline-minio-artifact",
secret_key_to_env={
"accesskey": "ACCESS_KEY",
"secretkey": "SECRET_KEY",
},
)
Attach pvc volume to a component
Before:
train_op = train().add_pvolumes({"/mnt": dsl.PipelineVolume(pvc=volume_name)})
Now:
from kfp import kubernetes
# keep this "mnt" as it is
train_op = train()
kubernetes.mount_pvc(
task=train_op, pvc_name=volume_name, mount_path="/mnt"
)